package gecko.impl;

import gecko.GeckoClient;
import gecko.GeckoScoped;
import gecko.VirtualTrigger;
import gecko.lang.TypedMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.SocketAddress;
import java.net.SocketException;
import java.nio.ByteBuffer;
import java.util.Objects;
import java.util.concurrent.Future;

/**
 * UdpServer触发器，可接收UDP数据包，经过处理后，返回UDP数据；
 *
 * @author 陈永佳 (yoojiachen@gmail.com)
 * @version 0.0.1
 */
public abstract class UdpServerTrigger implements VirtualTrigger {

    private final static Logger LOGGER = LoggerFactory.getLogger(UdpServerTrigger.class);

    private int mPort;
    private int mBuffSize;
    private Future<?> mServerFuture;

    private Adapter mAdapter;

    @Override
    public void onInit(TypedMap initArgs, GeckoScoped scoped) {
        mPort = initArgs.getInt("port", GeckoClient.UDP_PORT);
        mBuffSize = initArgs.getInt("bufferSizeKB", 1) * 1024;
    }

    @Override
    public void onStart(GeckoScoped scoped, ContextInvoker invoker) {
        LOGGER.info("服务启动，绑定端口： {}", mPort);
        // 在独立线程中运行UDP服务端
        final DatagramSocket udpSocket;
        try {
            udpSocket = new DatagramSocket(mPort);
        } catch (SocketException e) {
            LOGGER.error("UDP Socket服务发生错误", e);
            throw new RuntimeException(e);
        }
        mServerFuture = scoped.submit(() -> {
            Objects.requireNonNull(mAdapter, "Adapter is required, was null");
            final byte[] packBuffer = new byte[mBuffSize];
            final DatagramPacket packet = new DatagramPacket(packBuffer, packBuffer.length);
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    udpSocket.receive(packet);
                } catch (IOException ioe) {
                    LOGGER.error("UDP Socket读取数据错误", ioe);
                    continue;
                }
                final SocketAddress address = packet.getSocketAddress();
                final Callback callback = (topic, json) -> {
                    final byte[] bytes = mAdapter.encode(address, json);
                    try {
                        udpSocket.send(new DatagramPacket(bytes, bytes.length, address));
                    } catch (IOException ioe) {
                        LOGGER.error("UDP Socket返回数据错误", ioe);
                    }
                };
                final byte[] data = new byte[packet.getLength()];
                ByteBuffer.wrap(packet.getData()).get(data);
                mAdapter.decode(address, data).forEach(evt -> {
                    invoker.invoke(evt, callback);
                });
            }
            LOGGER.info("服务停止，绑定端口： {}", mPort);
        });
    }

    @Override
    public void onStop(GeckoScoped scoped, ContextInvoker invoker) {
        if (mServerFuture != null) {
            mServerFuture.cancel(true);
        }
    }

    protected void setAdapter(Adapter adapter) {
        mAdapter = adapter;
    }

}
